package com.kuang.utils.html;

/**
 * @author: xuke
 * @description: HtmlUtils
 * @Date : 2021/1/17
 */
public class HtmlUtils {
    /**
     * @作用:转换成html编码
     */
    public static String htmlEncode(String txt) {
        if (null != txt) {
            txt = txt.replace("&", "&amp;").replace("&amp;amp;", "&amp;").replace("&amp;quot;", "&quot;")
                    .replace("\"", "&quot;").replace("&amp;lt;", "&lt;").replace("<", "&lt;")
                    .replace("&amp;gt;", "&gt;").replace("&amp;nbsp;", "&nbsp;");//.replace(">", "&gt;");
        }
        return txt;
    }

    /**
     * @作用:去除html编码
     */
    public static String unHtmlEncode(String txt) {
        if (null != txt) {
            txt = txt.replace("&amp;", "&").replace("&quot;", "\"").replace("&lt;", "<").replace("&gt;", ">")
                    .replace("&nbsp;", " ");
        }
        return txt;
    }

    public static void main(String[] args){
      System.out.println(unHtmlEncode("# 一、简介\n" +
              "\n" +
              "## 1.是什么\n" +
              "\n" +
              "是一个分布式、支持分区的（partition）、多副本的（replication），基于zookeeper协调的分布式消息系统，可以实时的处理大量数据.\n" +
              "\n" +
              "## 2.有哪些主流的消息队列(中间件)\n" +
              "\n" +
              "+ RabbitMQ:由Erlang(二郎)语言编写。吞吐量比较低，不容易进一步开发扩展。\n" +
              "+ RocketMQ:由java编写，阿里开发，社区活跃度低，万一不维护，需要自己公司研发。\n" +
              "+ Redis:用作消息队列时，数据量大小在10k以内速度快，数据量大时会非常慢\n" +
              "+ Kafka：Apache开发，由Scala和java编写，**适合大数据实时计算以及日志收集**\n" +
              "\n" +
              "## 3.为什么要使用消息队列\n" +
              "\n" +
              "主要用来**缓冲任务**和**削峰**：上游数据有时会突发流量，下游可能扛不住，或者下游没有足够多的机器来保证处理，此时kafka在中间可以起到一个缓冲作用，把消息暂存在kafka集群中，下游服务就可以按照自己的节奏慢慢处理任务。\n" +
              "\n" +
              "\n" +
              "## 4.消息队列的应用场景\n" +
              "\n" +
              "用户管理中，当成功写入数据库后的回调信息中要作两件事：\n" +
              "\n" +
              "+ 发送注册邮件 \n" +
              "\n" +
              "\n" +
              "+ 发送注册短信\n" +
              "\n" +
              "如果使用消息队列，则在收到异步通知后就直接响应用户，把后两件事放在队列中慢慢处理。\n" +
              "\n" +
              "串行处理任务\n" +
              "\n" +
              "\n" +
              "并行处理任务（多线程问题）\n" +
              "\n" +
              "\n" +
              "\n" +
              "kafka消息队列处理\n" +
              "\n" +
              "\n" +
              "\n" +
              "## 5. kafka的分布式实现\n" +
              "\n" +
              "NameServer在kafka中使用的是zookeeper\n" +
              "\n" +
              "## 6.Kafka特性\n" +
              "\n" +
              "解耦、高吞吐量、低延迟、高并发、容错性、可扩展性、持久性和可靠性\n" +
              "\n" +
              "+ 解耦：使用kafka后，任务的处理者(consumer)与任务的发布者(producer)之间没有依赖关系。\n" +
              "\n" +
              "+ 高吞吐量、低延迟：kafka每秒可处理几十万条消息，延迟可低到几毫秒。每个topic可以分多个partition,consumer group对分区可并行读取\n" +
              "\n" +
              "+ 可扩展性：kafka集群支持热扩展\n" +
              "\n" +
              "+ 持久性和可靠性：\n" +
              "\n" +
              "  消息被持久化到本地磁盘,并支持数据备份防止数据丢失。\n" +
              "\n" +
              "+ 容错性：允许集群中节点失败，只要还剩下一个就能正常工作\n" +
              "\n" +
              "+ 高并发：支持数千个客户端的读写\n" +
              "\n" +
              "## 7.消息的分类 \n" +
              "\n" +
              "+ 点对点：一个队列可以有多个消费者一起消费，但一个消息只能被一个消费者消费。\n" +
              "\n" +
              "+ 发布与订阅\n" +
              "\n" +
              "  消息被持久化到一个topic中，消费者可消费该topic中所有数据，同一条数据可被多个消费者消费，数据被消费后不会删除。\n" +
              "\n" +
              "# 二、kafka整体架构\n" +
              "\n" +
              "## 1.kafka由哪些组件构成\n" +
              "\n" +
              "\n" +
              "\n" +
              "![](picture\\kafka逻辑架构.png)\n" +
              "\n" +
              "\n" +
              "\n" +
              "### 1.1 topic:\n" +
              "\n" +
              "+ 是什么？就是一堆消息，由多个分区构成。\n" +
              "\n" +
              "+ 分区partition,为什么要分区：\n" +
              "  + 从producer角度看，分区分布在不同broker上，**方便容量扩展**，同时也**提高吞吐量**和**负载均衡**\n" +
              "  + 从consumer角度看，一个组内的某个消费者只能消费一个分区，分区后可以**提高并发量，效率大提高**。但**要求组内消费者数量不能大于topic的分区数**\n" +
              "\n" +
              "+ 副本replication\n" +
              "\n" +
              "副本是什么？\n" +
              "\n" +
              "leader副本：用来提供读写功能，**一个分区只有一个leader副本**\n" +
              "\n" +
              "follower副本：只是被动地**备份**leader中数据，不提供读写功能，作用是为了**提高分区的可用性**\n" +
              "\n" +
              "### 1.2 producer:\n" +
              "\n" +
              "向topic中发布消息\n" +
              "\n" +
              "### 1.3 consumer:\n" +
              "\n" +
              "订阅topic中消息，并从topic中读取和处理消息\n" +
              "\n" +
              "### 1.4 broker:\n" +
              "\n" +
              "管理topic中消息存储的服务器\n" +
              "\n" +
              "## 2 . ISR和AR是什么，ISR的伸缩又指什么？\n" +
              "\n" +
              "+ ISR：In-sync Replicas(副本同步队列)，一个分区中，包含了leader和所有与leader保持同步的follower的id,该队列由控制器和leader维护，如果follower从leader中同步时间超过阈值，就会被从ISR中踢出，并把该follower的id存入OSR(Outof-sync replicas)列表中\n" +
              "\n" +
              "+ AR是什么？\n" +
              "\n" +
              "  AR=ISR+OSR\n" +
              "\n" +
              "+ ISR伸缩：follower副本跟leader同步超时会ISR中移除，当follower同步了所有leader数据后又加入ISR中\n" +
              "\n" +
              "## 3.控制器是什么？如何选举的\n" +
              "\n" +
              "就是一个Broker,集群中第一个启动的broker会通过在zookeeper中创建临时节点/controller来试图让自己成为控制器，其他broker会在该节点消失时收到通知，然后分别再向zk中写/controller节点，只有一个能成功，其他节点只能监听该节点\n" +
              "\n" +
              "作用：监听ids变化从而实现下面两个功能：\n" +
              "\n" +
              "+ topic的分区副本分配：一个topic为了实现负载均衡，会被分成多个分区，这些分区信息及与broker的对应关系由controller维护\n" +
              "\n" +
              "+ 分区的leader选举\n" +
              "\n" +
              "## 4.分区leader选举？\n" +
              "\n" +
              "某个作为leader的broker挂了，则controller会把其从ISR中移除，再从ISR列表中找跟当前leader保持最高同步的副本作为leader,如果都保持了完全同步，则按顺序从前向后选。\n" +
              "\n" +
              "# 三、环境搭建\n" +
              "\n" +
              "## 1.kafka安装\n" +
              "\n" +
              "+ 网址下载软件：http://kafka.apache.org/downloads.html\n" +
              "+ 百度链接：https://pan.baidu.com/s/1cFvOG7wfZYOwwYO2bHqang \n" +
              "  提取码：vpho\n" +
              "\n" +
              "\n" +
              "+ 解压：tar -zxvf kafka/kafka.tar.gz -C /opt/module/\n" +
              "\n" +
              "+ 在kafka目录中创建目录：mkdir logs，用来存放日志和消息数据\n" +
              "\n" +
              "+ 编辑配置文件：vim config/server.properties\n" +
              "  + broker.id=0\n" +
              "  + delete.topic.enable=true删除注释，以便可以删除topic\n" +
              "  + log.dirs=/opt/module/kafka/logs\n" +
              "  + zookeeper.connect=192.168.184.100:2181\n" +
              "  + listeners = PLAINTEXT://192.168.184.100:9092\n" +
              "    host.name=192.168.184.100,这里的ip是当前虚拟机的ip\n" +
              "\n" +
              "+ 克隆两台虚拟机并修改ip为101和102\n" +
              "\n" +
              "  https://blog.csdn.net/wlxs32/article/details/105232268,注意要删除每台虚拟机kafka内部的logs目录\n" +
              "\n" +
              "\n" +
              "## 2.启动\n" +
              "\n" +
              "### 1.启动zookeeper\n" +
              "\n" +
              "/opt/module/zookeeper-3.4.10/bin/zkServer.sh start\n" +
              "\n" +
              "### 2.启动与关闭kafka服务\n" +
              "\n" +
              "在每台虚拟机上都执行下面命令\n" +
              "\n" +
              "+ 前台运行:/opt/module/kafka/bin/kafka-server-start.sh  /opt/module/kafka/config/server.properties\n" +
              "\n" +
              "\n" +
              "+ 后台运行：/opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties\n" +
              "+ 通过jps命令查看java进程，如果看到kafka表示启动成功\n" +
              "+ 关闭服务：\n" +
              "  + netstat -alnp | grep 9092，kill -9 进程号\n" +
              "  + bin/kafka-server-stop.sh，如果权限不足，只可以chmod 777 kafka-server-stop.sh\n" +
              "\n" +
              "# 四、shell命令使用\n" +
              "\n" +
              "关闭：/opt/module/kafka/bin/kafka-server-stop.sh\n" +
              "\n" +
              "启动：/opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties\n" +
              "\n" +
              "## 1.创建topic命令\n" +
              "\n" +
              "/opt/module/kafka/bin/kafka-topics.sh --create --zookeeper 192.168.184.100:2181 --partitions 2 --replication-factor 2 --topic first\n" +
              "\n" +
              "会在logs目录中创建topic，分别为first-0和first-1\n" +
              "\n" +
              "## 2.查看集群中有哪些topic\n" +
              "\n" +
              "/opt/module/kafka/bin/kafka-topics.sh --list --zookeeper 192.168.184.100:2181\n" +
              "\n" +
              "## 3.查看指定topic的详细信息\n" +
              "\n" +
              " /opt/module/kafka/bin/kafka-topics.sh --describe --zookeeper 192.168.184.100:2181 --topic first\n" +
              "\n" +
              "+ 没有zookeeper可以使用kafka吗？zk有什么用\n" +
              "\n" +
              "  + 管理集群broker的上下线，如broker上线，会把brokerId写到zk的/brokers/ids节点下，下线就从zk中删除\n" +
              "\n" +
              "  + controller选举：kafka中某个broker会被zk选举为controller\n" +
              "\n" +
              "    + controller有什么用？\n" +
              "\n" +
              "      监听ids变化从而实现下面两个功能：\n" +
              "\n" +
              "      + topic的分区副本分配：一个topic为了实现负载均衡，会被分成多个分区，这些分区信息及与broker的对应关系由controller维护\n" +
              "\n" +
              "      + 分区的leader选举\n" +
              "\n" +
              "\n" +
              "## 4.删除topic\n" +
              "\n" +
              " /opt/module/kafka/bin/kafka-topics.sh --delete --zookeeper 192.168.184.100:2181 --topic first\n" +
              "\n" +
              "注意：必须所有分区所在的主机kafka服务处于开启状态才能正常完全删除。\n" +
              "\n" +
              "## 5.修改一个topic的分区数\n" +
              "\n" +
              "bin/kafka-topics.sh --zookeeper 192.168.184.100:2181 --alter --partitions 3 --topic first\n" +
              "\n" +
              "注意：分区数只能增加不能减少\n" +
              "\n" +
              "## 6. producer客户端连接\n" +
              "\n" +
              "bin/kafka-console-producer.sh --broker-list 192.168.184.101:9092 --topic first\n" +
              "\n" +
              "## 7.consumer客户端连接\n" +
              "\n" +
              "+ 旧版本用法：bin/kafka-console-consumer.sh --zookeeper 192.168.184.100:2181 --from-beginning --topic first\n" +
              "\n" +
              "  问题：offset的作用？\n" +
              "\n" +
              "每个消息都有一个顺序ID号，叫偏移量offset,用来唯一地识别分区中的每条消息\n" +
              "\n" +
              "+ 新版本用法\n" +
              "\n" +
              "  bin/kafka-console-consumer.sh --bootstrap-server 192.168.184.102:9092 --from-beginning --topic first\n" +
              "\n" +
              "  offset的值不再保存到zk中，而是保存在一个叫作**__consumer_offsets的topic中**，可通过命令bin/kafka-topics.sh --list --zookeeper 192.168.184.100:2181查看\n" +
              "\n" +
              "# 五、数据流程\n" +
              "\n" +
              "## 1.生产数据流程\n" +
              "\n" +
              "+ producer从broker-list中获取某一partition的leader\n" +
              "\n" +
              "  如zookeeper上获取0号分区的副本信息：get /brokers/topics/first/partitions/0/state\n" +
              "\n" +
              "+ producer把消息发送给某一个分区的leader\n" +
              "\n" +
              "+ leader将消息写入本地logs目录中\n" +
              "\n" +
              "+ 该分区的follwer从leader中pull消息，并写入follower本地logs，然后再向leader发送ack（acknowledge character）\n" +
              "\n" +
              "+ leader收到所有follower的ack后再向producer发送ack\n" +
              "\n" +
              "## 2.消费数据流程\n" +
              "\n" +
              "+ 采用pull模式消费消息，由消费者自己记录消费状态(自已把offset写入zk或kafka中)，每个消费者互相独立的顺序读取每个分区的消息。如果broker没数据，则会有一个超时等待时间，过了这段时间再返回。\n" +
              "\n" +
              "+ 分区消费\n" +
              "\n" +
              "  组内某个消费者只能同时消费一个分区，如果分区数大于组内消费者，则默认采用**轮询**方式依次消费数据。\n" +
              "\n" +
              "## 3. 数据持久化\n" +
              "\n" +
              "### 1.存储方式：\n" +
              "\n" +
              "topic存储在一个或多个partition中，每个分区对应一个文件夹，用来放消息和索引文件。\n" +
              "\n" +
              "### 2.数据删除策略\n" +
              "\n" +
              "无论消息是否被消费，kafka都会保留所有消息，有两种策略可以删除旧数据\n" +
              "\n" +
              "+ 基于时间：log.retention.hours=168,默认采用这种方式（7天）\n" +
              "+ 基于大小：log.retention.bytes=1073741824，1G,如果分区大小超过1G就会在适当时机把一最老的segment数据段删除\n" +
              "\n" +
              "### 3.数据的可靠性如何保证？\n" +
              "\n" +
              "**ack为all的前提下采用幂等机制**\n" +
              "\n" +
              "通过控制丢失率、重复率和副本数据的一致性来实现。\n" +
              "\n" +
              "+ ack的不同级别（3个）保证控制数据的丢失率和重复率\n" +
              "\n" +
              "  + 0：生产者将数据发送出去后就不管了，**不去等待任何回应**，好处是数据不会重复，效率高，但不能保证数据一定能成功写入\n" +
              "\n" +
              "  + 1:数据发送到leader，并成功写入leader的logs，但**不会等待成功写入follower就会响应**acks.  数据可能重复\n" +
              "\n" +
              "    如果leader在响应后，同步到follower前leader挂了，则会丢失数据。\n" +
              "\n" +
              "  + all:会**等待leader和所有副本成功写入，才响应**acks,否则就重发。\n" +
              "\n" +
              "    好处是数据绝对不丢失，缺点是会重复。\n" +
              "\n" +
              "    + 为什么会重复？\n" +
              "\n" +
              "      如果数据已写入全部副本，但在响应producer前，leader挂了，那么producer就认为写入失败，就要重写。\n" +
              "\n" +
              "    + 如何在kafka内去除重复？\n" +
              "\n" +
              "      + **用acks=all和幂等机制**，也就是将enable.indempotence设置为true就能实现。\n" +
              "      + 实现原理：kafka会额外为producer分配一唯一的id,叫作pid,producer在发送消息时，生成**消息的id(pid+消息的序列号)**,leader在接收消息时会把每条消息的id存入缓存中，并对新接收的消息判断是否是**重复的id,如果重复就丢弃**。\n" +
              "\n" +
              "+ 如何保证副本数据的一致性\n" +
              "\n" +
              "  通过LEO和HW保证副本数据的一致性。\n" +
              "\n" +
              "  + LEO：标识当前分区中下一条待写入消息的offset\n" +
              "\n" +
              "  + HW:高水位，实际上就是ISR中所有broker的LEO最小值，消费者只能获取HW这个offset之前的消息。\n" +
              "\n" +
              "    hw有什么用？\n" +
              "\n" +
              "    + follower挂掉后，被移出isr,恢复后获取挂掉前的hw,把副本中所有大于等于这个hw的数据清除，从leader同步数据。\n" +
              "      + 为什么要清除hw之后数据？因为follower恢复过程中，leader可能挂掉，并重新选举了新leader，而这个leader的LEO可能比较低，且随后又接收到一些新数据，该follower数据如果不删除一部分，就无法确定从哪个位置同步数据。\n" +
              "    + leader挂掉，会从ISR中选出一个新的leader之后，为保证多个副本之间的数据一致性，其余（不包括leader）的follower会先将各自的log文件高于HW的部分截掉，然后从新的leader同步数据。\n" +
              "\n" +
              "## 4.写数据的高效性？\n" +
              "\n" +
              "效率很高，因为采用顺序写，也就是在一个连续空间写数据，所以比写内存效率要高\n" +
              "\n" +
              "## 5.消费者获取消息的高效性如何保证\n" +
              "\n" +
              "+ 分区：在消费者组中有多个消费者并发读取不同的分区数据\n" +
              "\n" +
              "+ 分段：将数据文件分段，比如100条消息，它们的offset是从0到99，假设将数据文件分成5段，第一段为0-19号消息，其文件名为0.log，第二段20-39，其文件名20.log,以此类推。\n" +
              "\n" +
              "+ 稀疏索引\n" +
              "\n" +
              "  为每个小文件中的部分消息建立索引文件，名字跟分段文件名相同，后缀为.index，其内容有两部分：\n" +
              "\n" +
              "  (1)部分消息的相对offset\n" +
              "\n" +
              "  (2)部分消息的poistion:是相当于文件首地址的物理偏移量。\n" +
              "\n" +
              "\n" +
              "## 6.producer发送消息到哪个分区？分区选择的原则是什么？\n" +
              "\n" +
              "+ 指定分区器：发送到分区器指定的分区中，p.put(&quot;partitioner.class&quot;,&quot;xxx&quot;)\n" +
              "\n" +
              "+ 没指定分区器，但指定了key,就会根据key的hash值跟分区数取模得到结果，就是要发送的分区new ProducerRecord&lt;String, String>(&quot;first&quot;,&quot;abc&quot;,Hello&quot; + i),这里abc就是key\n" +
              "\n" +
              "+ key和分区器都没指定，则默认采用轮询（旧版本）决定发送到哪个分区。\n" +
              "\n" +
              "+ 配置文件\n" +
              "\n" +
              "  ```xml\n" +
              "  &lt;dependencies>\n" +
              "      &lt;dependency>\n" +
              "          &lt;groupId>org.apache.kafka&lt;/groupId>\n" +
              "          &lt;artifactId>kafka_2.12&lt;/artifactId>\n" +
              "          &lt;version>0.11.0.3&lt;/version>\n" +
              "      &lt;/dependency>\n" +
              "  &lt;/dependencies>\n" +
              "  ```\n" +
              "  \n" +
              "  ```properties\n" +
              "  #cfg.properties\n" +
              "  bootstrap.servers=192.168.184.100:9092\n" +
              "  #一批数据的大小\n" +
              "  batch.size=16384\n" +
              "  #accumulator缓冲区大小\n" +
              "  buffer.memory=33554432\n" +
              "  #提交延时,当达到指定时间,即便是accumulator缓存不满也会提交数据\n" +
              "  linger.ms=1\n" +
              "  #生产者需要用序列化器（Serializer）将key和value序列化成字节数组才可以将消息传入Kafka。\n" +
              "  #消费者需要用反序列化器（Deserializer）把从Kafka中收到的字节数组转化成相应的对象\n" +
              "  #指定kv的序列化器的类型，值为其class路径。\n" +
              "  key.serializer=org.apache.kafka.common.serialization.StringSerializer\n" +
              "  value.serializer=org.apache.kafka.common.serialization.StringSerializer\n" +
              "  partitioner.class=hy.MyPartition\n" +
              "  retries=0\n" +
              "  ```\n" +
              "\n" +
              "## 7. 生产者拦截器\n" +
              "\n" +
              "用户在消息发送前以及producer回调逻辑前有机会对消息做一些定制化需求\n" +
              "\n" +
              "如何实现？\n" +
              "\n" +
              "+ 定义类并实现接口：ProducerInterceptor\n" +
              "\n" +
              "  ```\n" +
              "  public class MyInterCeptor implements ProducerInterceptor&lt;String,String>{\n" +
              "      long sucNum;\n" +
              "      long errNum;\n" +
              "      //producer发送前会先调用该方法\n" +
              "      public ProducerRecord&lt;String, String> onSend(ProducerRecord&lt;String, String> r) {\n" +
              "          return new ProducerRecord&lt;String,String>(r.topic(),r.partition(),r.timestamp(),r.key(),System.currentTimeMillis()+r.value(),r.headers());\n" +
              "      }\n" +
              "\n" +
              "      //在kafka响应ack时会调用\n" +
              "      public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {\n" +
              "          if(e==null){\n" +
              "              sucNum++;\n" +
              "          }else{\n" +
              "              errNum++;\n" +
              "          }\n" +
              "      }\n" +
              "      //kfka的Producer关闭时会调用\n" +
              "      public void close() {\n" +
              "          System.out.println(&quot;本次共成功发送：&quot;+sucNum+&quot;,失败：&quot;+errNum);\n" +
              "      }\n" +
              "      //获取配置信息时调用\n" +
              "      public void configure(Map&lt;String, ?> map) {\n" +
              "          System.out.println(map);\n" +
              "      }\n" +
              "  }\n" +
              "  ```\n" +
              "\n" +
              "+ 添加拦截器到配置文件中。\n" +
              "\n" +
              "\n" +
              "```\n" +
              "p.put(&quot;interceptor.classes&quot;,&quot;hzn.MyInterCeptor&quot;);\n" +
              "```\n" +
              "\n" +
              "## 8.消费者\n" +
              "\n" +
              "```java\n" +
              "public class MyConsumer {\n" +
              "    public static void main(String[] args) {\n" +
              "        Properties p=new Properties();\n" +
              "        p.put(&quot;bootstrap.servers&quot;,&quot;192.168.184.100:9092&quot;);\n" +
              "        p.put(&quot;group.id&quot;,&quot;gr_01&quot;);\n" +
              "        p.put(&quot;enable.auto.commit&quot;,&quot;false&quot;);//关闭自动提交\n" +
              "\n" +
              "        p.put(&quot;key.deserializer&quot;, StringDeserializer.class.getName());\n" +
              "        p.put(&quot;value.deserializer&quot;, StringDeserializer.class.getName());\n" +
              "        p.put(&quot;auto.offset.reset&quot;,&quot;earliest&quot;);//从头开始消费\n" +
              "\n" +
              "        KafkaConsumer&lt;String,String> consumer=new KafkaConsumer&lt;String, String>(p);\n" +
              "        while(true){\n" +
              "            consumer.subscribe(Arrays.asList(&quot;first&quot;));\n" +
              "            ConsumerRecords&lt;String,String> cr=consumer.poll(100);\n" +
              "            for(ConsumerRecord&lt;String,String> r:cr){\n" +
              "                System.out.println(&quot;主题：&quot;+r.topic()+&quot;,所属分区：&quot;+r.partition()+&quot;,内容：&quot;+r.value());\n" +
              "            }\n" +
              "            //手动提交\n" +
              "            consumer.commitAsync();\n" +
              "        }\n" +
              "\n" +
              "        /*\n" +
              "         *异步：commitAsync，提交后就结束，不阻塞\n" +
              "         *同步：commitSync，提交后会等待kafka确认，在收到确认前会阻塞\n" +
              "         */\n" +
              "    }\n" +
              "}\n" +
              "```\n" +
              "\n" +
              "## 9.分区选择\n" +
              "\n" +
              "```java\n" +
              "public class MyPartition implements Partitioner{\n" +
              "    Random random=new Random();\n" +
              "    int p=-1;\n" +
              "    @Override\n" +
              "    public int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) {\n" +
              "        System.out.println(&quot;s=&quot;+s);\n" +
              "        int part=random.nextInt(2);\n" +
              "        //return part;\n" +
              "        p++;\n" +
              "        if(p==2){\n" +
              "            p=0;\n" +
              "        }\n" +
              "        return p;\n" +
              "    }\n" +
              "\n" +
              "    @Override\n" +
              "    public void close() {\n" +
              "\n" +
              "    }\n" +
              "\n" +
              "    @Override\n" +
              "    public void configure(Map&lt;String, ?> map) {\n" +
              "\n" +
              "    }\n" +
              "}\n" +
              "\n" +
              "```\n" +
              "\n" +
              "## 10.生产者\n" +
              "\n" +
              "```java\n" +
              "public class MyProducer {\n" +
              "    public static void main(String[] args) throws Exception{\n" +
              "        Properties p = new Properties();\n" +
              "        //p.put(&quot;bootstrap.servers&quot;,&quot;192.168.184.100:9092&quot;);\n" +
              "        p.put(&quot;acks&quot;,&quot;all&quot;);\n" +
              "        p.put(&quot;enable.indempotence&quot;,true);//幂等机制\n" +
              "        p.load(MyProducer.class.getClassLoader().getResourceAsStream(&quot;cfg.properties&quot;));\n" +
              "\n" +
              "        Producer&lt;String, String> producer = new KafkaProducer&lt;String, String>(p);\n" +
              "        for (int i = 0; i &lt; 10; i++) {\n" +
              "            //producer.send(new ProducerRecord&lt;String, String>(&quot;first&quot;,&quot;hello_&quot;+i));\n" +
              "            //producer.send(new ProducerRecord&lt;String, String>(&quot;first&quot;,&quot;abc&quot;,&quot;hello_&quot;+i));\n" +
              "            producer.send(new ProducerRecord&lt;String, String>(&quot;first&quot;, &quot;abc&quot;, &quot;hello_&quot; + i), (m, e) -> {\n" +
              "                //收到ack时调用，发送失败且不再重发也会调用该方法\n" +
              "                if(e==null){\n" +
              "                    //发送成功\n" +
              "                    System.out.println(&quot;分区:&quot;+m.partition()+&quot;,offset:&quot;+m.offset());\n" +
              "                }else {\n" +
              "                    e.printStackTrace();\n" +
              "                    System.out.println(&quot;发送失败&quot;);\n" +
              "                }\n" +
              "            }).get();//注意：调用get的后果是让发送方法阻塞；当收到ack并回调就解除阻塞\n" +
              "        }\n" +
              "        producer.close();\n" +
              "    }\n" +
              "}\n" +
              "\n" +
              "```\n" +
              "\n" +
              "\n" +
              "\n" +
              "# 六、面试题\n" +
              "\n" +
              "## 1.kafka是怎么体现消息顺序性的？\n" +
              "\n" +
              "在一个分区内能过offset来维护顺序性，不同分区无法保证。\n" +
              "\n" +
              "## 2.kafka新建的分区会在哪个目录中创建？\n" +
              "\n" +
              "在配置文件指定的logs目录下创建:first-0,first-1\n" +
              "\n" +
              "## 3.kafka中的分区器、序列化器、拦截器是否了解？之间的顺序是什么？\n" +
              "\n" +
              "+ kafka在发送消息前先经过拦截器处理\n" +
              "+ 序列化器转换为字节数组\n" +
              "+ 分区器指定分区\n" +
              "\n" +
              "## 4.什么情况会造成重复消费？什么情况会造成漏消费？\n" +
              "\n" +
              "+ 重复消费?\n" +
              "\n" +
              "  + 生产者：**ack=all**时生产者会重复发消息会重复消费，可通过幂等机制解决\n" +
              "  + 消费者：消费者采用**先消费后提交offset**的方式，如果在消费结束后提交offset时，失败了，则会重复消费同一条记录。可通过同步提交offset的方式解决。\n" +
              "\n" +
              "+ 漏消费？\n" +
              "\n" +
              "  + 生产者：ack=0或1时，kafka在生产者发送了数据后可能没存储成功，从而漏消费。\n" +
              "  + 消费者：消费者采用先提交offset,后消费，可能会漏消费。可先消费后提交解决\n" +
              "\n" +
              "\n" +
              "## 5.kafka内部有topic吗？有什么用？\n" +
              "\n" +
              "__consumer_offsets,用来记录所有topic的所有分区被消费者消费的offset\n" +
              "\n" +
              "## 6.如何保证消息只能被消费一次？\n" +
              "\n" +
              "+ producer端\n" +
              "\n" +
              "  + ack=all\n" +
              "\n" +
              "  + ```\n" +
              "    p.put(&quot;enable.indempotence&quot;,true);//幂等机制\n" +
              "    ```\n" +
              "\n" +
              "\n" +
              "+ consumer端\n" +
              "  + 关闭自动提交功能\n" +
              "  + 每消费一次数据后手动提交\n" +
              "\n" +
              "\n" +
              "\n" +
              "\n" +
              "\n" +
              "\n"));
    }
}
